from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, row_number,avg, desc
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import Window
from pyspark.ml.feature import BucketedRandomProjectionLSH, Normalizer
import plotly.express as px
import seaborn as sns
# Create a SparkSession
spark = SparkSession.builder \
.appName("Data Preprocessing") \
.getOrCreate()
24/05/09 22:53:43 WARN Utils: Your hostname, Soumitras-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.227 instead (on interface en0) 24/05/09 22:53:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/05/09 22:53:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
# Load the datasets from hadoop
file_path1 = "hdfs://localhost:9000/datafolder/datazip/Books_rating.csv"
file_path2 = "hdfs://localhost:9000/datafolder/datazip/books_data.csv"
books_rating = spark.read.csv(file_path1, header=True, inferSchema=True)
books_data = spark.read.csv(file_path2, header=True, inferSchema=True)
# Considering only the required columns
books_rating = books_rating[['Id','Title','User_id','review/score','Price']]
books_data = books_data[['Title','authors','publisher','categories','publishedDate']]
books_rating.describe().show()
books_data.describe().show()
24/05/09 22:53:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+--------------------+--------------------+-------------------+------------------+--------------------+ |summary| Id| Title| User_id| review/score| Price| +-------+--------------------+--------------------+-------------------+------------------+--------------------+ | count| 3000000| 2999792| 2437750| 2999870| 482421| | mean|1.0568515696607149E9| 2012.796651763537| 18.29299003322259| 1656.860421970827| 21.767951161877054| | stddev| 1.284488524833734E9| 1536.7533549608797| 21.99284402625621|1427549.9863179324| 26.21155241772817| | min| 0001047604| """ Film technique| "" Film acting """| & Algorithms"""| "" and| | max| B0064P287I|you can do anythi...| AZZZZW74AAX75| thersites|: A guide to loca...| +-------+--------------------+--------------------+-------------------+------------------+--------------------+
[Stage 7:> (0 + 8) / 8]
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+ |summary| Title| authors| publisher| categories| publishedDate| +-------+--------------------+--------------------+--------------------+--------------------+--------------------+ | count| 212403| 181153| 139274| 171880| 186560| | mean| 3823.672941176471| 1578.4| 3734.75| 1983.7334777898159| 1982.702933143332| | stddev| 10717.999589636447| 1278.7901502106834| 10193.316327911616| 142.43423125699238| 37.65620052385513| | min| """ Film technique| "" ""I'm a Littl...| "" ""Skipper Ire...| "" Knox's quirky...| "" ""Cruising fo...| | max|you can do anythi...|” “Jeanie with th...| 펜립|�� folk art is a ...|” which is anthol...| +-------+--------------------+--------------------+--------------------+--------------------+--------------------+
# Extract only the year from the publishedDate column
from pyspark.sql.functions import year
books_data = books_data.withColumn("publishedYear", year("publishedDate")).drop("publishedDate")
books_data = books_data.filter(col("publishedYear").rlike("^\d+$"))
# dropping duplicate rows in both the tables
count_original = books_rating.count()
books_rating.dropDuplicates()
count_after = books_rating.count()
d1 = count_original - count_after
print("number of duplicates in books_rating:", d1)
count_original2 = books_data.count()
books_data.dropDuplicates()
count_after2 = books_data.count()
d2 = count_original2 - count_after2
print("number of duplicates in books_data:", d2)
number of duplicates in books_rating: 0 number of duplicates in books_data: 0
# Create a dictionary to display the count of null values for each column
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)
Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled. Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize= [Stage 31:====================> (8 + 8) / 22]
CodeCache: size=131072Kb used=34992Kb max_used=35006Kb free=96079Kb bounds [0x0000000104934000, 0x0000000106ba4000, 0x000000010c934000] total_blobs=13261 nmethods=12319 adapters=854 compilation: disabled (not enough contiguous free space left)
Null values in books_rating dataset: {'Id': 0, 'Title': 208, 'User_id': 562250, 'review/score': 130, 'Price': 2517579}
Null values in books_data dataset: {'Title': 1, 'authors': 6839, 'publisher': 48179, 'categories': 15259, 'publishedYear': 0}
from pyspark.sql.functions import when, col, first
from pyspark.sql.window import Window
books_rating = books_rating.withColumn("Price", when(col("Price").cast("double").isNotNull(), col("Price")).otherwise(None))
books_rating.show()
# filling the null values in price column with the price of same books ( some users left the price columns blank)
window_spec = Window.partitionBy("Title").orderBy("Price")
books_rating = books_rating.withColumn("Price", first("Price", True).over(window_spec))
+----------+--------------------+--------------+------------+-----+ | Id| Title| User_id|review/score|Price| +----------+--------------------+--------------+------------+-----+ |1882931173|Its Only Art If I...| AVCGYZL8FQQTD| 4.0| NULL| |0826414346|Dr. Seuss: Americ...|A30TK6U7DNS82R| 5.0| NULL| |0826414346|Dr. Seuss: Americ...|A3UH4UZ4RSVO82| 5.0| NULL| |0826414346|Dr. Seuss: Americ...|A2MVUWT453QH61| 4.0| NULL| |0826414346|Dr. Seuss: Americ...|A22X4XUPKF66MR| 4.0| NULL| |0826414346|Dr. Seuss: Americ...|A2F6NONFUDB6UK| 4.0| NULL| |0826414346|Dr. Seuss: Americ...|A14OJS0VWMOSWO| 5.0| NULL| |0826414346|Dr. Seuss: Americ...|A2RSSXTDZDUSH4| 5.0| NULL| |0826414346|Dr. Seuss: Americ...|A25MD5I2GUIW6W| 5.0| NULL| |0826414346|Dr. Seuss: Americ...|A3VA4XFS5WNJO3| 4.0| NULL| |0829814000|Wonderful Worship...| AZ0IOBU20TBOP| 5.0|19.40| |0829814000|Wonderful Worship...|A373VVEU6Z9M0N| 5.0|19.40| |0829814000|Wonderful Worship...| AGKGOH65VTRR4| 5.0|19.40| |0829814000|Wonderful Worship...| A3OQWLU31BU1Y| 5.0|19.40| |0595344550|Whispers of the W...|A3Q12RK71N74LB| 1.0|10.95| |0595344550|Whispers of the W...|A1E9M6APK30ZAU| 4.0|10.95| |0595344550|Whispers of the W...| AUR0VA5H0C66C| 1.0|10.95| |0595344550|Whispers of the W...|A1YLDZ3VHR6QPZ| 5.0|10.95| |0595344550|Whispers of the W...| ACO23CG8K8T77| 5.0|10.95| |0595344550|Whispers of the W...|A1VK81CRRC7MLM| 5.0|10.95| +----------+--------------------+--------------+------------+-----+ only showing top 20 rows
# Dropping null values
books_rating = books_rating.na.drop()
books_data = books_data.na.drop()
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)
books_rating.describe().show()
books_data.describe().show()
Null values in books_rating dataset: {'Id': 0, 'Title': 0, 'User_id': 0, 'review/score': 0, 'Price': 0}
Null values in books_data dataset: {'Title': 0, 'authors': 0, 'publisher': 0, 'categories': 0, 'publishedYear': 0}
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+ |summary| Id| Title| User_id| review/score| Price| +-------+--------------------+--------------------+--------------------+--------------------+-----------------+ | count| 414180| 414180| 414180| 414180| 414180| | mean|1.0516446472336355E9| 1889.688679245283| NULL| 4.239725294240715|21.62184188033609| | stddev|1.0441895933760347E9| 124.2255524061775| NULL| 1.2911124349740133|26.24759579737497| | min| 0002554232|"""Beauty Shop-Ph...|A00117421L76WVWG4...| & Algorithms"""| 1.00| | max| B000TGB9VE|www.whitbread.org...| AZZZZW74AAX75|teach to understa...| 995.00| +-------+--------------------+--------------------+--------------------+--------------------+-----------------+ +-------+--------------------+------------------------+--------------------+--------------------+-----------------+ |summary| Title| authors| publisher| categories| publishedYear| +-------+--------------------+------------------------+--------------------+--------------------+-----------------+ | count| 112450| 112450| 112450| 112450| 112450| | mean| 1320.25| NULL| 51495.0| NULL|2002.091943085816| | stddev| 841.971623206931| NULL| NULL| NULL|14.00102150533584| | min|""" We'll Always ...| The New York Tim...| as shared with D...| Luther Vandross....| 1016| | max|you can do anythi...|['黒田領治', 'Ryōji K...| 펜립| ['Zoning']| 2023| +-------+--------------------+------------------------+--------------------+--------------------+-----------------+
# Removing special characters
from pyspark.sql.functions import udf, col, split, size, regexp_replace, initcap
from pyspark.sql.types import StringType
import re
from pyspark.sql.functions import year
def remove_special_characters(text):
if text is not None:
return re.sub(r'[^\w\s]', '', text)
else:
return None
remove_special_characters_udf = udf(remove_special_characters, StringType())
books_data = books_data.withColumn("categories", remove_special_characters_udf("categories"))
books_data = books_data.filter(size(split(col("categories"), " ")) == 1)
books_data = books_data.withColumn("Title", regexp_replace(col("Title"), "\\b\\s+", ""))
books_data = books_data.withColumn("Title", initcap(col("Title")))
books_rating = books_rating.withColumn("Title", initcap(col("Title")))
books_data = books_data.withColumn("categories", initcap(col("categories")))
def remove_integers_and_special_characters(text):
if text is not None:
return re.sub(r'[^a-zA-Z\s]', '', text)
else:
return None
remove_integers_and_special_characters_udf = udf(remove_integers_and_special_characters, StringType())
books_data = books_data.withColumn("categories", remove_integers_and_special_characters_udf("categories"))
books_data = books_data.filter(books_data["categories"].rlike("^[a-zA-Z\s]+$"))
# Renaming columns for convenience
books_rating = books_rating.withColumnRenamed("review/score", "rating")
books_rating.describe().show()
[Stage 109:======> (1 + 8) / 9]
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+ |summary| Id| Title| User_id| rating| Price| +-------+--------------------+--------------------+--------------------+--------------------+-----------------+ | count| 414180| 414180| 414180| 414180| 414180| | mean|1.0516446472336355E9| 1889.688679245283| NULL| 4.239725294240715|21.62184188033609| | stddev|1.0441895933760347E9| 124.2255524061775| NULL| 1.2911124349740133|26.24759579737497| | min| 0002554232|"""beauty Shop-ph...|A00117421L76WVWG4...| & Algorithms"""| 1.00| | max| B000TGB9VE|Zulu Shaman: Drea...| AZZZZW74AAX75|teach to understa...| 995.00| +-------+--------------------+--------------------+--------------------+--------------------+-----------------+
# Count the number of ratings per user
user_id_counts = books_rating.groupBy("User_id").agg(count("*").alias("count"))
user_id_counts= user_id_counts.orderBy(user_id_counts["count"].desc())
user_id_counts.show(20)
result = user_id_counts.groupBy((col('count') / 10).cast('int').alias('range')).count().orderBy('range')
result.show()
+--------------+-----+ | User_id|count| +--------------+-----+ |A14OJS0VWMOSWO| 2105| | AFVQZQ8PW0L| 606| | AG35NEEFCMQVR| 307| |A1M8PP7MLHNBQB| 278| |A1D2C0WDCSHUWZ| 271| | AHD101501WCN1| 242| |A2VKWLCNZF4ZVB| 205| |A1NATT3PN24QWY| 200| |A1K1JW1C5CUSUZ| 179| |A1X8VZWTOG8IS6| 174| |A1S3C5OFU508P3| 158| |A3M174IC0VXOS2| 152| |A2EDZH51XHFA9B| 147| |A2VE83MZF98ITY| 143| |A21NVBFIEQWDSG| 142| |A2NJO6YE954DBH| 141| |A2OJW07GQRNJUT| 129| |A2F6N60Z96CAJI| 118| |A1OX82JPAQLL60| 113| |A1G37DFO8MQW0M| 112| +--------------+-----+ only showing top 20 rows
[Stage 121:============> (2 + 7) / 9]
+-----+------+ |range| count| +-----+------+ | 0|303448| | 1| 1213| | 2| 250| | 3| 126| | 4| 47| | 5| 33| | 6| 16| | 7| 10| | 8| 6| | 9| 7| | 10| 8| | 11| 5| | 12| 1| | 14| 4| | 15| 2| | 17| 2| | 20| 2| | 24| 1| | 27| 2| | 30| 1| +-----+------+ only showing top 20 rows
# Filter out users that has invalid user_id
user_id_counts = user_id_counts.filter(col("user_id").rlike("^[a-zA-Z0-9]+$"))
user_id_counts.show(20)
# Filter users who has given the ratings
filtered_user_ids = user_id_counts.filter(col("count") >= 1).select("User_id")
filtered_user_ids.describe().show()
+--------------+-----+ | User_id|count| +--------------+-----+ |A14OJS0VWMOSWO| 2105| | AFVQZQ8PW0L| 606| | AG35NEEFCMQVR| 307| |A1M8PP7MLHNBQB| 278| |A1D2C0WDCSHUWZ| 271| | AHD101501WCN1| 242| |A2VKWLCNZF4ZVB| 205| |A1NATT3PN24QWY| 200| |A1K1JW1C5CUSUZ| 179| |A1X8VZWTOG8IS6| 174| |A1S3C5OFU508P3| 158| |A3M174IC0VXOS2| 152| |A2EDZH51XHFA9B| 147| |A2VE83MZF98ITY| 143| |A21NVBFIEQWDSG| 142| |A2NJO6YE954DBH| 141| |A2OJW07GQRNJUT| 129| |A2F6N60Z96CAJI| 118| |A1OX82JPAQLL60| 113| |A281NPSIMI1C2R| 112| +--------------+-----+ only showing top 20 rows
[Stage 147:> (0 + 1) / 1]
+-------+--------------------+ |summary| User_id| +-------+--------------------+ | count| 305186| | mean| NULL| | stddev| NULL| | min|A00117421L76WVWG4...| | max| AZZZZW74AAX75| +-------+--------------------+
# Join to filter the ratings
filtered_ratings = books_rating.join(filtered_user_ids, "User_id", "inner")
# Merging both datasets
books_merged = filtered_ratings.join(books_data, "Title", "inner")
# Ensure the rating column is numeric
books_final = books_merged.withColumn("rating", col("rating").cast("double"))
books_final.show()
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+ | Title| User_id| Id|rating|Price| authors| publisher|categories|publishedYear| +-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+ | Herland|A101DG7P9E26PW|1419123548| 4.0|17.12|['Charlotte Perki...| Xist Publishing| Fiction| 2015| | Herland|A101DG7P9E26PW|1421810182| 4.0|17.12|['Charlotte Perki...| Xist Publishing| Fiction| 2015| | Spellbound|A10VOEBL5S337W|014250193X| 4.0| 5.99| ['James Essinger']| Delta| History| 2007| | Plainsong|A111DVWFAZPOO1|0375705856| 1.0|10.20| ['Kent Haruf']| Vintage| Fiction| 2001| | Hannibal|A11ZDEVYIMC6AI|0440224675| 2.0| 7.67| ['Thomas Harris']| Dell| Fiction| 2009| | Hannibal|A11ZDEVYIMC6AI|0440224675| 2.0| 7.67| ['Thomas Harris']| Dell| Fiction| 2009| | Riddle-master|A11ZRTRRYD9P2W|0441005969| 5.0|12.92|['Patricia A. McK...| Penguin| Fiction| 1999| | Vox|A1241U6QCSX5YJ|0679742115| 3.0|10.95|['Christina Dalch...| Boekerij| Fiction| 2019| | Insurrection|A124MIARA7T5J9|0671720244| 3.0| 7.56| ['Peter Rollins']| Simon and Schuster| Religion| 2011| | Hamlet|A125T7DGLW3GPP|0764120840| 5.0| 7.06|['William Shakesp...| Bantam Classics| Drama| 1988| |Lovin' Mrs. Jones|A129029D0E7ADN|0972458603| 3.0|11.66|['Edward Dean Arn...|Pearlstone Pub In...| Fiction| 2003| | Riddle-master|A12HYMIF3856GN|0441005969| 5.0|12.92|['Patricia A. McK...| Penguin| Fiction| 1999| | Superpatriotism|A12J7R7GAJ5FBZ|0872864332| 5.0| 9.95| ['Michael Parenti']| City Lights Books| History| 2004| | Providence|A12J7R7GAJ5FBZ|0595197809| 5.0|15.95| ['Caroline Kepnes']| Lenny| Fiction| 2018| | Potluck|A12UQKP0MU09LN|1892590379| 5.0|14.95|['Kristin Donnelly']| Clarkson Potter| Cooking| 2016| | Drakon|A12X1VN7QQNPPC|0671877119| 3.0| 5.99| ['N.J. Walters']| Entangled: Amara| Fiction| 2018| | Bellwether|A131JVFO4PJKPE|0553562967| 5.0| 7.56| ['Connie Willis']| Spectra| Fiction| 2010| | Hamlet|A13A206PIDTBQK|0764120840| 5.0| 7.06|['William Shakesp...| Bantam Classics| Drama| 1988| | Plainsong|A13F2H4DIX231F|0375705856| 5.0|10.20| ['Kent Haruf']| Vintage| Fiction| 2001| | Insurrection|A13WA3SCM778LO|0671720244| 3.0| 7.56| ['Peter Rollins']| Simon and Schuster| Religion| 2011| +-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+ only showing top 20 rows
books_final.describe().show()
[Stage 180:=====================> (3 + 5) / 8]
+-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+ |summary|Title| User_id| Id| rating| Price| authors| publisher|categories| publishedYear| +-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+ | count| 6600| 6600| 6600| 6571| 6600| 6600| 6600| 6600| 6600| | mean| NULL| NULL| 6.845429041481897E8| 3.721503576320195|18.180798484848523| NULL| NULL| NULL|2007.634696969697| | stddev| NULL| NULL|4.6595010557118016E8|1.4230303776831768|55.546915116959966| NULL| NULL| NULL|6.783271168969406| | min| 51a|A100I0T791DIKS| 0007104022| 1.0| 10.00|"[""John O'Loughl...| 010 Publishers| Animals| 1943| | max|Zoom!| AZYIWJ4P9VZVE| B000MTEKTQ| 5.0| 96.80| ['Yasushi Inoue']|powerHouse Books| Travel| 2022| +-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd
temp_df = books_final.groupBy("rating").count().toPandas()
# Create trace1 for bar chart
trace1 = go.Bar(
x=temp_df['rating'],
y=temp_df['count'],
text = temp_df['count'], # Labels for each bar
marker=dict(color='rgb(255,165,0)', line=dict(color='rgb(0,0,0)', width=1.5))
)
layout_bar = go.Layout(
template="plotly_dark",
title='RATINGS COUNT',
xaxis=dict(title='Rating'),
yaxis=dict(title='Count')
)
fig_bar = go.Figure(data=[trace1], layout=layout_bar)
fig_bar.show()
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd
# Filter out null values
temp_df_filtered = temp_df.dropna(subset=['rating'])
def pie_plot(cnt_srs, title):
labels = cnt_srs['rating']
values = cnt_srs['count']
trace = go.Pie(
labels=labels,
values=values,
hoverinfo='percent+value',
textinfo='percent',
textposition='inside',
hole=0.7,
showlegend=True,
marker=dict(
colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
line=dict(color='#000000', width=2)
)
)
layout = go.Layout(
template="plotly_dark",
title=title
)
fig = go.Figure(data=[trace], layout=layout)
return fig
fig_pie = pie_plot(temp_df_filtered, 'Rating Distribution')
fig_pie.show()
from pyspark.sql.functions import col, when
# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Fiction", when(col("categories") == "Fiction", 1).otherwise(0)) \
.withColumn("Non-Fiction", when(col("categories") != "Fiction", 1).otherwise(0)) \
.groupBy().sum("Fiction", "Non-Fiction") \
.withColumnRenamed("sum(Fiction)", "Fiction") \
.withColumnRenamed("sum(Non-Fiction)", "Non-Fiction")
genre_counts_pd = genre_counts.toPandas()
fiction_label = genre_counts_pd.iloc[0]["Fiction"]
non_fiction_label = genre_counts_pd.iloc[0]["Non-Fiction"]
trace1 = go.Bar(
x=["Fiction", "Non-Fiction"],
y=[fiction_label, non_fiction_label],
text=[fiction_label, non_fiction_label], # Labels for each bar
marker=dict(color='rgb(255,165,0)',
line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(template="plotly_dark", title = 'Count of ratings among Fiction and Non-Fiction Books', xaxis=dict(title='Categories'), yaxis=dict(title='Count'))
fig = go.Figure(data=[trace1], layout=layout)
fig.show()
import plotly.graph_objs as go
from pyspark.sql.functions import col, when
# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Category", when(col("categories") == "Fiction", "Fiction").otherwise("Non-Fiction"))
genre_counts_pie = genre_counts.groupBy('Category').count()
genre_counts_pd = genre_counts_pie.toPandas()
def pie_plot(cnt_srs, title):
labels = cnt_srs['Category']
values = cnt_srs['count']
trace = go.Pie(
labels=labels,
values=values,
hoverinfo='percent+value',
textinfo='percent',
textposition='inside',
hole=0.7,
showlegend=True,
marker=dict(
colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
line=dict(color='#000000', width=2)
)
)
layout = go.Layout(
template="plotly_dark",
title=title
)
fig = go.Figure(data=[trace], layout=layout)
return fig
fig_pie = pie_plot(genre_counts_pd, 'GENRE Distribution')
fig_pie.show()
!pip install bubbly
DEPRECATION: Loading egg at /Users/soumitra7/anaconda3/lib/python3.11/site-packages/pyBWMD-0.0.1-py3.11.egg is deprecated. pip 23.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Requirement already satisfied: bubbly in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (1.0.2) Requirement already satisfied: plotly in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from bubbly) (5.9.0) Requirement already satisfied: pandas in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from bubbly) (2.0.3) Requirement already satisfied: python-dateutil>=2.8.2 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (2.8.2) Requirement already satisfied: pytz>=2020.1 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (2023.3.post1) Requirement already satisfied: tzdata>=2022.1 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (2023.3) Requirement already satisfied: numpy>=1.21.0 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from pandas->bubbly) (1.24.3) Requirement already satisfied: tenacity>=6.2.0 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from plotly->bubbly) (8.2.2) Requirement already satisfied: six>=1.5 in /Users/soumitra7/anaconda3/lib/python3.11/site-packages (from python-dateutil>=2.8.2->pandas->bubbly) (1.16.0)
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd
df1 = books_final.groupBy(['categories', 'publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()
fig = make_subplots(rows=1, cols=1)
for genre in df1['categories'].unique():
df_genre = df1[df1['categories'] == genre]
fig.add_trace(go.Scatter(
x=df_genre['User Rating'],
y=df_genre['Price'],
mode='markers',
marker=dict(size=10),
name=genre
))
fig.update_layout(
template="plotly_dark",
title='Avg price and rating of various categories',
xaxis_title='User Rating',
yaxis_title='Avg Price',
xaxis=dict(type='log'), # Log scale for x-axis
showlegend=True
)
fig.show()
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd
df1 = books_final.groupBy(['publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()
fig = make_subplots(rows=1, cols=1)
for year in df1['publishedYear'].unique():
df_genre = df1[df1['publishedYear'] == year]
fig.add_trace(go.Scatter(
x=df_genre['User Rating'],
y=df_genre['Price'],
mode='markers',
marker=dict(size=10),
name=str(year)
))
fig.update_layout(
template="plotly_dark",
title='Avg price and rating over the years',
xaxis_title='User Rating',
yaxis_title='Avg Price',
xaxis=dict(type='log'), # Log scale for x-axis
showlegend=True
)
fig.show()
# Group by Author and count occurrences
top_authors = books_final.groupBy('authors').agg(F.avg('rating').alias("author_rating")) \
.orderBy('author_rating', ascending=False).limit(10)
top_authors_pd = top_authors.toPandas()
trace1 = go.Bar(
x=top_authors_pd['authors'],
y=top_authors_pd['author_rating'],
marker=dict(color='rgb(255,165,0)',
line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(
template="plotly_dark",
title='TOP 10 AUTHORS WITH HIGH AVERAGE RATING',
xaxis=dict(title='Author', tickangle=45),
yaxis=dict(title='Avg rating')
)
fig = go.Figure(data=[trace1], layout=layout)
fig.show()
# Group by Author and count occurrences
top_publishers = books_final.groupBy('publisher').agg(F.avg('rating').alias("publisher_rating")) \
.orderBy('publisher_rating', ascending=False).limit(10)
top_publishers_pd = top_publishers.toPandas()
trace1 = go.Bar(
x=top_publishers_pd['publisher'],
y=top_publishers_pd['publisher_rating'],
marker=dict(color='rgb(255,165,0)',
line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(
template="plotly_dark",
title='TOP 10 PUBLISHERS WITH HIGH AVERAGE RATING',
xaxis=dict(title='publisher', tickangle=45),
yaxis=dict(title='Avg rating')
)
fig = go.Figure(data=[trace1], layout=layout)
fig.show()
temp_df1_year = books_final.groupBy('publishedYear').mean()
temp_df1_year_pd = temp_df1_year.toPandas()
trace1_year = go.Bar(
x=temp_df1_year_pd['publishedYear'],
y=temp_df1_year_pd['avg(rating)'],
marker=dict(color='rgb(255,165,0)',
line=dict(color='rgb(0,0,0)',width=1.5))
)
layout_year = go.Layout(
template="plotly_dark",
title='AVERAGE REVIEWS OVER THE YEARS',
xaxis=dict(title='Year'),
yaxis=dict(title='count')
)
fig_year = go.Figure(data=[trace1_year], layout=layout_year)
fig_year.show()
from pyspark.sql import functions as F
import plotly.graph_objs as go
import pandas as pd
# Group by publishedYear and calculate the mean of price
temp_df_year = books_final.groupBy('publishedYear').agg(F.mean('Price').alias('avg_price'))
temp_df_year_pd = temp_df_year.toPandas()
trace1_year = go.Bar(
x=temp_df_year_pd['publishedYear'],
y=temp_df_year_pd['avg_price'],
marker=dict(color='rgb(148, 103, 189)',
line=dict(color='rgb(0,0,0)', width=1.5))
)
layout_year = go.Layout(
template="plotly_dark",
title='AVERAGE PRICE OVER THE YEARS',
xaxis=dict(title='Year'),
yaxis=dict(title='Average Price')
)
fig_year = go.Figure(data=[trace1_year], layout=layout_year)
fig_year.show()
books_final.show(20)
24/05/09 23:00:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+ | Title| User_id| Id|rating|Price| authors| publisher|categories|publishedYear| +-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+ | Herland|A101DG7P9E26PW|1419123548| 4.0|17.12|['Charlotte Perki...| Xist Publishing| Fiction| 2015| | Herland|A101DG7P9E26PW|1421810182| 4.0|17.12|['Charlotte Perki...| Xist Publishing| Fiction| 2015| | Spellbound|A10VOEBL5S337W|014250193X| 4.0| 5.99| ['James Essinger']| Delta| History| 2007| | Plainsong|A111DVWFAZPOO1|0375705856| 1.0|10.20| ['Kent Haruf']| Vintage| Fiction| 2001| | Hannibal|A11ZDEVYIMC6AI|0440224675| 2.0| 7.67| ['Thomas Harris']| Dell| Fiction| 2009| | Hannibal|A11ZDEVYIMC6AI|0440224675| 2.0| 7.67| ['Thomas Harris']| Dell| Fiction| 2009| | Riddle-master|A11ZRTRRYD9P2W|0441005969| 5.0|12.92|['Patricia A. McK...| Penguin| Fiction| 1999| | Vox|A1241U6QCSX5YJ|0679742115| 3.0|10.95|['Christina Dalch...| Boekerij| Fiction| 2019| | Insurrection|A124MIARA7T5J9|0671720244| 3.0| 7.56| ['Peter Rollins']| Simon and Schuster| Religion| 2011| | Hamlet|A125T7DGLW3GPP|0764120840| 5.0| 7.06|['William Shakesp...| Bantam Classics| Drama| 1988| |Lovin' Mrs. Jones|A129029D0E7ADN|0972458603| 3.0|11.66|['Edward Dean Arn...|Pearlstone Pub In...| Fiction| 2003| | Riddle-master|A12HYMIF3856GN|0441005969| 5.0|12.92|['Patricia A. McK...| Penguin| Fiction| 1999| | Superpatriotism|A12J7R7GAJ5FBZ|0872864332| 5.0| 9.95| ['Michael Parenti']| City Lights Books| History| 2004| | Providence|A12J7R7GAJ5FBZ|0595197809| 5.0|15.95| ['Caroline Kepnes']| Lenny| Fiction| 2018| | Potluck|A12UQKP0MU09LN|1892590379| 5.0|14.95|['Kristin Donnelly']| Clarkson Potter| Cooking| 2016| | Drakon|A12X1VN7QQNPPC|0671877119| 3.0| 5.99| ['N.J. Walters']| Entangled: Amara| Fiction| 2018| | Bellwether|A131JVFO4PJKPE|0553562967| 5.0| 7.56| ['Connie Willis']| Spectra| Fiction| 2010| | Hamlet|A13A206PIDTBQK|0764120840| 5.0| 7.06|['William Shakesp...| Bantam Classics| Drama| 1988| | Plainsong|A13F2H4DIX231F|0375705856| 5.0|10.20| ['Kent Haruf']| Vintage| Fiction| 2001| | Insurrection|A13WA3SCM778LO|0671720244| 3.0| 7.56| ['Peter Rollins']| Simon and Schuster| Religion| 2011| +-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+ only showing top 20 rows
# Pivoting the DataFrame
book_pivot = books_final.groupBy("Title").pivot("User_id").avg("rating")
# Fill Null values with 0
book_pivot = book_pivot.na.fill(0)
# Convert pivot table to RDD of (Title, features) tuples
sparse_rdd = book_pivot.rdd.map(lambda row: (row[0], Vectors.dense(row[1:])))
# Define a schema for the RDD
schema = ["Title", "features"]
# Create a DataFrame from the RDD
sparse_matrix = spark.createDataFrame(sparse_rdd, schema)
sparse_matrix.show()
# Normalizing the features
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
norm_features = normalizer.transform(sparse_matrix)
# Creating a Bucketed Random Projection LSH model
brp = BucketedRandomProjectionLSH(inputCol="norm_features", outputCol="hashes", bucketLength=1.0, numHashTables=10)
model = brp.fit(norm_features)
24/05/09 23:00:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/09 23:00:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/09 23:00:45 WARN DAGScheduler: Broadcasting large task binary with size 1435.8 KiB
24/05/09 23:00:51 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/05/09 23:00:55 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
+-----------------+--------------------+ | Title| features| +-----------------+--------------------+ | Nemesis|[0.0,0.0,0.0,0.0,...| | Bite|[0.0,0.0,1.0,0.0,...| | Mercy|[0.0,0.0,0.0,0.0,...| | Romance|[0.0,0.0,0.0,0.0,...| | Guyana|[0.0,0.0,0.0,0.0,...| | Hostas|[5.0,0.0,0.0,0.0,...| | Believe|[0.0,0.0,0.0,0.0,...| | Rockbound|[0.0,0.0,0.0,0.0,...| | Restoree|[0.0,0.0,0.0,0.0,...| | Approaching...|[0.0,0.0,0.0,0.0,...| | Riddle-master|[0.0,0.0,0.0,0.0,...| | Borderlands|[0.0,0.0,0.0,0.0,...| | Nettie|[0.0,0.0,0.0,0.0,...| | Art|[0.0,0.0,0.0,0.0,...| |Lighthousekeeping|[0.0,0.0,0.0,0.0,...| | Saucer|[0.0,0.0,0.0,0.0,...| | Herland|[0.0,4.0,0.0,0.0,...| | Grievances|[0.0,0.0,0.0,0.0,...| | Jurgen|[0.0,0.0,0.0,0.0,...| | Rawsome!|[0.0,0.0,0.0,0.0,...| +-----------------+--------------------+ only showing top 20 rows
24/05/09 23:00:58 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB 24/05/09 23:01:00 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
#Provide input book title from the user
user_input_title = input("Enter the title of the book: ")
user_input_title = user_input_title.title()
Enter the title of the book: bite
# Check if the input book title is in the DataFrame
if book_pivot.filter(col("Title") == user_input_title).count() == 0:
print(f"Book '{user_input_title}' not found.")
else:
# Extracting the features of the input book
input_book_features = norm_features.filter(col("Title") == user_input_title).select("norm_features").collect()[0][0]
# Approximate k nearest neighbors of the input book
knn = model.approxNearestNeighbors(norm_features, input_book_features, 6)
# Display the recommended books with their IDs
recommended_books = knn.filter(col("Title") != user_input_title).limit(5).collect()
for book in recommended_books:
print(book[0])
24/05/09 23:01:45 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB 24/05/09 23:01:48 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB [Stage 465:> (0 + 1) / 1]
Jumpmetrics Dreamspy Scarab-4 Kunma Sensation